/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.common.message;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;

/**
 * 消息解码
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 */
public class MessageDecoder {
	/**
	 * 消息ID定长
	 */
	public final static int MSG_ID_LENGTH = 8 + 8;

	/**
	 * 存储记录各个字段位置
	 */
	public final static int MessageMagicCodePostion = 4;
	public final static int MessageFlagPostion = 16;
	public final static int MessagePhysicOffsetPostion = 28;
	public final static int MessageStoreTimestampPostion = 56;
	public final static int MessageMagicCode = 0xAABBCCDD ^ 1880681586 + 8;
	private final static String charset = "utf-8";

	public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
		input.flip();
		input.limit(MessageDecoder.MSG_ID_LENGTH);

		// 消息存储主机地址 IP PORT 8
		input.put(addr);
		// 消息对应的物理分区 OFFSET 8
		input.putLong(offset);

		return UtilAll.bytes2string(input.array());
	}

	public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) {
		ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
		InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
		byteBuffer.put(inetSocketAddress.getAddress().getAddress());
		byteBuffer.putInt(inetSocketAddress.getPort());
		byteBuffer.putLong(transactionIdhashCode);
		byteBuffer.flip();
		return UtilAll.bytes2string(byteBuffer.array());
	}

	public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
		SocketAddress address;
		long offset;

		// 地址
		byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
		byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
		ByteBuffer bb = ByteBuffer.wrap(port);
		int portInt = bb.getInt(0);
		address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

		// offset
		byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
		bb = ByteBuffer.wrap(data);
		offset = bb.getLong(0);

		return new MessageId(address, offset);
	}

	public static MessageExt decode(java.nio.ByteBuffer byteBuffer) {
		return decode(byteBuffer, true, true);
	}

	/**
	 * 客户端使用，SLAVE也会使用
	 */
	public static MessageExt decode(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
		return decode(byteBuffer, readBody, true);
	}
	public static byte[] encode(MessageExt messageExt) throws Exception {
		byte[] body = messageExt.getBody();
		int bodyLength = messageExt.getBody().length;
		byte[] topics = messageExt.getTopic().getBytes(charset);
		byte topicLen = (byte) topics.length;
		String properties = messageProperties2String(messageExt.getProperties());
		byte[] propertiesBytes = properties.getBytes(charset);
		short propertiesLength = (short) propertiesBytes.length;
		final int msgLen = 4 // 1 TOTALSIZE
				+ 4 // 2 MAGICCODE
				+ 4 // 3 BODYCRC
				+ 4 // 4 QUEUEID
				+ 4 // 5 FLAG
				+ 8 // 6 QUEUEOFFSET
				+ 8 // 7 PHYSICALOFFSET
				+ 4 // 8 SYSFLAG
				+ 8 // 9 BORNTIMESTAMP
				+ 8 // 10 BORNHOST
				+ 8 // 11 STORETIMESTAMP
				+ 8 // 12 STOREHOSTADDRESS
				+ 4 // 13 RECONSUMETIMES
				+ 8 // 14 Prepared Transaction Offset
				+ 4 + bodyLength // 14 BODY
				+ 1 + topicLen // 15 TOPIC
				+ 2 + propertiesLength // 16 propertiesLength
				+ 0;
		ByteBuffer byteBuffer = ByteBuffer.allocate(msgLen);

		// 1 TOTALSIZE
		int storeSize = messageExt.getStoreSize();
		byteBuffer.putInt(storeSize);

		// 2 MAGICCODE
		byteBuffer.putInt(MessageMagicCode);

		// 3 BODYCRC
		int bodyCRC = messageExt.getBodyCRC();
		byteBuffer.putInt(bodyCRC);

		// 4 QUEUEID
		int queueId = messageExt.getQueueId();
		byteBuffer.putInt(queueId);

		// 5 FLAG
		int flag = messageExt.getFlag();
		byteBuffer.putInt(flag);

		// 6 QUEUEOFFSET
		long queueOffset = messageExt.getQueueOffset();
		byteBuffer.putLong(queueOffset);

		// 7 PHYSICALOFFSET
		long physicOffset = messageExt.getCommitLogOffset();
		byteBuffer.putLong(physicOffset);

		// 8 SYSFLAG
		int sysFlag = messageExt.getSysFlag();
		byteBuffer.putInt(sysFlag);

		// 9 BORNTIMESTAMP
		long bornTimeStamp = messageExt.getBornTimestamp();
		byteBuffer.putLong(bornTimeStamp);

		// 10 BORNHOST

		InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();
		byteBuffer.put(bornHost.getAddress().getAddress());
		byteBuffer.putInt(bornHost.getPort());

		// 11 STORETIMESTAMP
		long storeTimestamp = messageExt.getStoreTimestamp();
		byteBuffer.putLong(storeTimestamp);

		// 12 STOREHOST

		InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();
		byteBuffer.put(serverHost.getAddress().getAddress());
		byteBuffer.putInt(serverHost.getPort());

		// 13 RECONSUMETIMES
		int reconsumeTimes = messageExt.getReconsumeTimes();
		byteBuffer.putInt(reconsumeTimes);

		// 14 Prepared Transaction Offset
		long preparedTransactionOffset = messageExt.getPreparedTransactionOffset();
		byteBuffer.putLong(preparedTransactionOffset);

		// 15 BODY
		byte[] newBody = body;
		if ((sysFlag & MessageSysFlag.CompressedFlag) == MessageSysFlag.CompressedFlag) {
			newBody = UtilAll.compress(body, 5);
		}

		byteBuffer.putInt(bodyLength);
		byteBuffer.put(newBody);

		// 16 TOPIC
		byteBuffer.put(topicLen);
		byteBuffer.put(topics);

		// 17 properties
		byteBuffer.putShort(propertiesLength);
		byteBuffer.put(propertiesBytes);

		// // 消息ID
		// ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);
		// String msgId =
		// createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
		// msgExt.setMsgId(msgId);

		return byteBuffer.array();

	}

	public static MessageExt decode(java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {
		try {
			MessageExt msgExt = new MessageExt();

			// 1 TOTALSIZE
			int storeSize = byteBuffer.getInt();
			msgExt.setStoreSize(storeSize);

			// 2 MAGICCODE
			byteBuffer.getInt();

			// 3 BODYCRC
			int bodyCRC = byteBuffer.getInt();
			msgExt.setBodyCRC(bodyCRC);

			// 4 QUEUEID
			int queueId = byteBuffer.getInt();
			msgExt.setQueueId(queueId);

			// 5 FLAG
			int flag = byteBuffer.getInt();
			msgExt.setFlag(flag);

			// 6 QUEUEOFFSET
			long queueOffset = byteBuffer.getLong();
			msgExt.setQueueOffset(queueOffset);

			// 7 PHYSICALOFFSET
			long physicOffset = byteBuffer.getLong();
			msgExt.setCommitLogOffset(physicOffset);

			// 8 SYSFLAG
			int sysFlag = byteBuffer.getInt();
			msgExt.setSysFlag(sysFlag);

			// 9 BORNTIMESTAMP
			long bornTimeStamp = byteBuffer.getLong();
			msgExt.setBornTimestamp(bornTimeStamp);

			// 10 BORNHOST
			byte[] bornHost = new byte[4];
			byteBuffer.get(bornHost, 0, 4);
			int port = byteBuffer.getInt();
			msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));

			// 11 STORETIMESTAMP
			long storeTimestamp = byteBuffer.getLong();
			msgExt.setStoreTimestamp(storeTimestamp);

			// 12 STOREHOST
			byte[] storeHost = new byte[4];
			byteBuffer.get(storeHost, 0, 4);
			port = byteBuffer.getInt();
			msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));

			// 13 RECONSUMETIMES
			int reconsumeTimes = byteBuffer.getInt();
			msgExt.setReconsumeTimes(reconsumeTimes);

			// 14 Prepared Transaction Offset
			long preparedTransactionOffset = byteBuffer.getLong();
			msgExt.setPreparedTransactionOffset(preparedTransactionOffset);

			// 15 BODY
			int bodyLen = byteBuffer.getInt();
			if (bodyLen > 0) {
				if (readBody) {
					byte[] body = new byte[bodyLen];
					byteBuffer.get(body);

					// uncompress body
					if (deCompressBody && (sysFlag & MessageSysFlag.CompressedFlag) == MessageSysFlag.CompressedFlag) {
						body = UtilAll.uncompress(body);
					}

					msgExt.setBody(body);
				} else {
					byteBuffer.position(byteBuffer.position() + bodyLen);
				}
			}

			// 16 TOPIC
			byte topicLen = byteBuffer.get();
			byte[] topic = new byte[(int) topicLen];
			byteBuffer.get(topic);
			msgExt.setTopic(new String(topic));

			// 17 properties
			short propertiesLength = byteBuffer.getShort();
			if (propertiesLength > 0) {
				byte[] properties = new byte[propertiesLength];
				byteBuffer.get(properties);
				String propertiesString = new String(properties, Charset.forName("UTF-8"));
				Map<String, String> map = string2messageProperties(propertiesString);
				msgExt.setProperties(map);
			}

			// 消息ID
			// MsgId长度为16字节
			// 1、8字节的消息存储本机的IP:PORT
			// 2、8个字节的该消息的物理偏移offset值构成
			ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);
			String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
			msgExt.setMsgId(msgId);

			return msgExt;
		} catch (UnknownHostException e) {
			byteBuffer.position(byteBuffer.limit());
		} catch (BufferUnderflowException e) {
			byteBuffer.position(byteBuffer.limit());
		} catch (Exception e) {
			byteBuffer.position(byteBuffer.limit());
		}

		return null;
	}

	public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer) {
		return decodes(byteBuffer, true);
	}

	/**
	 * 客户端使用
	 */
	public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
		List<MessageExt> msgExts = new ArrayList<MessageExt>();
		while (byteBuffer.hasRemaining()) {
			MessageExt msgExt = decode(byteBuffer, readBody);
			if (null != msgExt) {
				msgExts.add(msgExt);
			} else {
				break;
			}
		}
		return msgExts;
	}

	/**
	 * 序列化消息属性
	 */
	public static final char NAME_VALUE_SEPARATOR = 1;
	public static final char PROPERTY_SEPARATOR = 2;

	public static String messageProperties2String(Map<String, String> properties) {
		StringBuilder sb = new StringBuilder();
		if (properties != null) {
			for (final Map.Entry<String, String> entry : properties.entrySet()) {
				final String name = entry.getKey();
				final String value = entry.getValue();

				sb.append(name);
				sb.append(NAME_VALUE_SEPARATOR);
				sb.append(value);
				sb.append(PROPERTY_SEPARATOR);
			}
		}
		return sb.toString();
	}

	public static Map<String, String> string2messageProperties(final String properties) {
		Map<String, String> map = new HashMap<String, String>();
		if (properties != null) {
			String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR));
			if (items != null) {
				for (String i : items) {
					String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR));
					if (nv != null && 2 == nv.length) {
						map.put(nv[0], nv[1]);
					}
				}
			}
		}

		return map;
	}
}
